package com.lagou.bak;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

public class CEPLoginTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStreamSource<CEPLoginBean> data = env.fromElements(
                new CEPLoginBean(1L, "fail", 1597905234000L),
                new CEPLoginBean(1L, "success", 1597905235000L),
                new CEPLoginBean(2L, "fail", 1597905236000L),
                new CEPLoginBean(2L, "fail", 1597905237000L),
                new CEPLoginBean(2L, "fail", 1597905238000L),
                new CEPLoginBean(3L, "fail", 1597905239000L),
                new CEPLoginBean(3L, "success", 1597905240000L)
        );

        SingleOutputStreamOperator<CEPLoginBean> watermarks = data.assignTimestampsAndWatermarks(new WatermarkStrategy<CEPLoginBean>() {
            @Override
            public WatermarkGenerator<CEPLoginBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<CEPLoginBean>() {
                    long maxTimeStamp = Long.MIN_VALUE;

                    @Override
                    public void onEvent(CEPLoginBean event, long eventTimestamp, WatermarkOutput output) {
                        maxTimeStamp = Math.max(maxTimeStamp, event.getTs());
                    }

                    long maxOutOfOrderness = 500L;

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));
                    }
                };
            }
        }.withTimestampAssigner(((element, recordTimestamp) -> element.getTs())));

        KeyedStream<CEPLoginBean, Long> keyed = watermarks.keyBy(value -> value.getId());

        Pattern<CEPLoginBean, CEPLoginBean> pattern = Pattern.<CEPLoginBean>begin("start").where(new IterativeCondition<CEPLoginBean>() {
            @Override
            public boolean filter(CEPLoginBean value, Context<CEPLoginBean> ctx) throws Exception {
                return value.getLogresult().equals("fail");
            }
        })
                .next("next").where(new IterativeCondition<CEPLoginBean>() {
                    @Override
                    public boolean filter(CEPLoginBean value, Context<CEPLoginBean> ctx) throws Exception {
                        return value.getLogresult().equals("fail");
                    }
                })
                .within(Time.seconds(5));

        PatternStream<CEPLoginBean> patternStream = CEP.pattern(keyed, pattern);
        SingleOutputStreamOperator<String> process = patternStream.process(new PatternProcessFunction<CEPLoginBean, String>() {
            @Override
            public void processMatch(Map<String, List<CEPLoginBean>> match, Context ctx, Collector<String> out) throws Exception {
                System.out.println(match);
                List<CEPLoginBean> start = match.get("start");
                List<CEPLoginBean> next = match.get("next");
                String res = "start:" + start + "...next:" + next;
                out.collect(res + start.get(0).getId());
            }
        });

        process.print();

        env.execute();
    }
}


